自主规划算法概述#
自主规划算法是Agentic AI系统的核心能力之一,它允许AI系统根据目标自主分解任务、制定执行计划、监控执行进度,并根据实际情况调整计划。
规划算法的基本概念#
1. 什么是自主规划#
自主规划是指AI系统在没有人类干预的情况下,根据给定的目标,自动生成和执行任务计划的过程。
自主规划的特点:
- 目标驱动: 以目标为导向进行规划
- 任务分解: 将复杂目标分解为可执行的子任务
- 动态调整: 根据执行情况动态调整计划
- 资源管理: 合理分配和利用资源
2. 规划算法分类#
| 算法类型 | 特点 | 适用场景 |
|---|
| 前向搜索 | 从初始状态到目标状态 | 目标明确,状态空间小 |
| 后向搜索 | 从目标状态到初始状态 | 目标状态明确,初始状态复杂 |
| 双向搜索 | 同时从两端搜索 | 状态空间大,两端明确 |
| 层次规划 | 分层规划,逐步细化 | 复杂任务,多层级目标 |
| 部分规划 | 规划部分路径,边执行边规划 | 动态环境,不确定性高 |
任务分解算法#
1. 层次任务分解#
示例:层次任务分解
用户请求:
"实现一个层次任务分解算法"
Claude Code 生成的代码:
````python
from typing import Dict, List, Any, Optional
from dataclasses import dataclass, field
from enum import Enum
import logging
logger = logging.getLogger(__name__)
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class TaskPriority(Enum):
"""任务优先级"""
CRITICAL = 1
HIGH = 2
MEDIUM = 3
LOW = 4
@dataclass
class Task:
"""任务"""
id: str
name: str
description: str
status: TaskStatus = TaskStatus.PENDING
priority: TaskPriority = TaskPriority.MEDIUM
dependencies: List[str] = field(default_factory=list)
subtasks: List['Task'] = field(default_factory=list)
estimated_duration: Optional[float] = None
actual_duration: Optional[float] = None
result: Optional[Any] = None
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
class HierarchicalTaskPlanner:
"""层次任务规划器"""
def __init__(self):
self.tasks: Dict[str, Task] = {}
self.task_graph: Dict[str, List[str]] = {}
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
self.task_graph[task.id] = task.dependencies
logger.info(f"Task added: {task.id}")
def decompose_task(self, task_id: str, max_depth: int = 3) -> List[Task]:
"""分解任务"""
task = self.tasks.get(task_id)
if not task:
raise ValueError(f"Task not found: {task_id}")
if max_depth <= 0:
return [task]
# 生成子任务
subtasks = self._generate_subtasks(task)
# 递归分解子任务
all_tasks = [task]
for subtask in subtasks:
self.add_task(subtask)
task.subtasks.append(subtask)
all_tasks.extend(self.decompose_task(subtask.id, max_depth - 1))
# 更新任务图
self.task_graph[task.id] = [subtask.id for subtask in subtasks]
return all_tasks
def _generate_subtasks(self, task: Task) -> List[Task]:
"""生成子任务"""
subtasks = []
# 根据任务类型生成子任务
task_type = task.metadata.get('type', 'default')
if task_type == 'code_generation':
subtasks = self._generate_code_generation_subtasks(task)
elif task_type == 'code_review':
subtasks = self._generate_code_review_subtasks(task)
elif task_type == 'testing':
subtasks = self._generate_testing_subtasks(task)
elif task_type == 'deployment':
subtasks = self._generate_deployment_subtasks(task)
else:
subtasks = self._generate_default_subtasks(task)
return subtasks
def _generate_code_generation_subtasks(self, task: Task) -> List[Task]:
"""生成代码生成子任务"""
subtasks = [
Task(
id=f"{task.id}_analyze_requirements",
name="Analyze Requirements",
description="Analyze the requirements for code generation",
priority=TaskPriority.HIGH,
metadata={'type': 'analysis'}
),
Task(
id=f"{task.id}_design_architecture",
name="Design Architecture",
description="Design the system architecture",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_analyze_requirements"],
metadata={'type': 'design'}
),
Task(
id=f"{task.id}_generate_code",
name="Generate Code",
description="Generate the code",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_design_architecture"],
metadata={'type': 'code_generation'}
),
Task(
id=f"{task.id}_review_code",
name="Review Code",
description="Review the generated code",
priority=TaskPriority.MEDIUM,
dependencies=[f"{task.id}_generate_code"],
metadata={'type': 'code_review'}
)
]
return subtasks
def _generate_code_review_subtasks(self, task: Task) -> List[Task]:
"""生成代码审查子任务"""
subtasks = [
Task(
id=f"{task.id}_static_analysis",
name="Static Analysis",
description="Perform static code analysis",
priority=TaskPriority.HIGH,
metadata={'type': 'analysis'}
),
Task(
id=f"{task.id}_security_check",
name="Security Check",
description="Check for security vulnerabilities",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_static_analysis"],
metadata={'type': 'security'}
),
Task(
id=f"{task.id}_performance_check",
name="Performance Check",
description="Check for performance issues",
priority=TaskPriority.MEDIUM,
dependencies=[f"{task.id}_static_analysis"],
metadata={'type': 'performance'}
),
Task(
id=f"{task.id}_generate_report",
name="Generate Report",
description="Generate review report",
priority=TaskPriority.MEDIUM,
dependencies=[
f"{task.id}_security_check",
f"{task.id}_performance_check"
],
metadata={'type': 'reporting'}
)
]
return subtasks
def _generate_testing_subtasks(self, task: Task) -> List[Task]:
"""生成测试子任务"""
subtasks = [
Task(
id=f"{task.id}_unit_tests",
name="Unit Tests",
description="Write and run unit tests",
priority=TaskPriority.HIGH,
metadata={'type': 'testing'}
),
Task(
id=f"{task.id}_integration_tests",
name="Integration Tests",
description="Write and run integration tests",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_unit_tests"],
metadata={'type': 'testing'}
),
Task(
id=f"{task.id}_e2e_tests",
name="E2E Tests",
description="Write and run end-to-end tests",
priority=TaskPriority.MEDIUM,
dependencies=[f"{task.id}_integration_tests"],
metadata={'type': 'testing'}
)
]
return subtasks
def _generate_deployment_subtasks(self, task: Task) -> List[Task]:
"""生成部署子任务"""
subtasks = [
Task(
id=f"{task.id}_build",
name="Build",
description="Build the application",
priority=TaskPriority.HIGH,
metadata={'type': 'build'}
),
Task(
id=f"{task.id}_test",
name="Test",
description="Run tests",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_build"],
metadata={'type': 'testing'}
),
Task(
id=f"{task.id}_deploy",
name="Deploy",
description="Deploy to production",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_test"],
metadata={'type': 'deployment'}
),
Task(
id=f"{task.id}_verify",
name="Verify",
description="Verify deployment",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_deploy"],
metadata={'type': 'verification'}
)
]
return subtasks
def _generate_default_subtasks(self, task: Task) -> List[Task]:
"""生成默认子任务"""
subtasks = [
Task(
id=f"{task.id}_prepare",
name="Prepare",
description="Prepare for task execution",
priority=TaskPriority.HIGH,
metadata={'type': 'preparation'}
),
Task(
id=f"{task.id}_execute",
name="Execute",
description="Execute the task",
priority=TaskPriority.HIGH,
dependencies=[f"{task.id}_prepare"],
metadata={'type': 'execution'}
),
Task(
id=f"{task.id}_finalize",
name="Finalize",
description="Finalize the task",
priority=TaskPriority.MEDIUM,
dependencies=[f"{task.id}_execute"],
metadata={'type': 'finalization'}
)
]
return subtasks
def get_execution_plan(self, task_id: str) -> List[Task]:
"""获取执行计划"""
plan = []
visited = set()
def dfs(current_task_id):
if current_task_id in visited:
return
visited.add(current_task_id)
先执行依赖任务
for dep_id in self.task_graph.get(current_task_id, []):
dfs(dep_id)
添加当前任务
task = self.tasks.get(current_task_id)
if task:
plan.append(task)
dfs(task_id)
return plan
def visualize_plan(self, task_id: str) -> str:
"""可视化执行计划"""
plan = self.get_execution_plan(task_id)
visualization = "Execution Plan:\n"
for i, task in enumerate(plan, 1):
status_icon = {
TaskStatus.PENDING: "○",
TaskStatus.IN_PROGRESS: "◐",
TaskStatus.COMPLETED: "●",
TaskStatus.FAILED: "✗",
TaskStatus.CANCELLED: "⊘"
}.get(task.status, "?")
visualization += f"{i}. {status_icon} {task.name}\n"
if task.subtasks:
for j, subtask in enumerate(task.subtasks, 1):
subtask_icon = {
TaskStatus.PENDING: "○",
TaskStatus.IN_PROGRESS: "◐",
TaskStatus.COMPLETED: "●",
TaskStatus.FAILED: "✗",
TaskStatus.CANCELLED: "⊘"
}.get(subtask.status, "?")
visualization += f" {j}. {subtask_icon} {subtask.name}\n"
return visualization
使用示例
planner = HierarchicalTaskPlanner()
添加主任务
main_task = Task(
id="main",
name="Develop User Authentication System",
description="Develop a complete user authentication system",
priority=TaskPriority.HIGH,
metadata={'type': 'code_generation'}
)
planner.add_task(main_task)
分解任务
planner.decompose_task("main", max_depth=2)
获取执行计划
plan = planner.get_execution_plan("main")
print("Execution Plan:")
for task in plan:
print(f" - {task.name}")
可视化计划
print("\n" + planner.visualize_plan("main"))
2. 动态任务调整#
示例:动态任务调整
用户请求:
"实现动态任务调整算法"
Claude Code 生成的代码:
`python
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import logging
logger = logging.getLogger(__name__)
class DynamicTaskPlanner(HierarchicalTaskPlanner):
"""动态任务规划器"""
def __init__(self):
super().__init__()
self.execution_history: Dict[str, List[Dict[str, Any]]] = {}
self.performance_metrics: Dict[str, Dict[str, float]] = {}
def monitor_execution(self, task_id: str, status: TaskStatus, result: Optional[Any] = None, error: Optional[str] = None):
"""监控任务执行"""
task = self.tasks.get(task_id)
if not task:
return
# 更新任务状态
task.status = status
task.result = result
task.error = error
# 记录执行历史
if task_id not in self.execution_history:
self.execution_history[task_id] = []
self.execution_history[task_id].append({
'timestamp': datetime.utcnow(),
'status': status,
'result': result,
'error': error
})
logger.info(f"Task {task_id} status updated to {status}")
def analyze_performance(self, task_id: str) -> Dict[str, float]:
"""分析任务性能"""
if task_id not in self.execution_history:
return {}
history = self.execution_history[task_id]
# 计算性能指标
metrics = {
'total_executions': len(history),
'success_rate': 0.0,
'avg_duration': 0.0,
'failure_rate': 0.0
}
successful = [h for h in history if h['status'] == TaskStatus.COMPLETED]
failed = [h for h in history if h['status'] == TaskStatus.FAILED]
if len(history) > 0:
metrics['success_rate'] = len(successful) / len(history)
metrics['failure_rate'] = len(failed) / len(history)
self.performance_metrics[task_id] = metrics
return metrics
def adjust_plan(self, task_id: str) -> List[Task]:
"""调整执行计划"""
task = self.tasks.get(task_id)
if not task:
return []
# 分析性能
metrics = self.analyze_performance(task_id)
# 根据性能调整计划
if metrics.get('failure_rate', 0) > 0.5:
# 失败率高,添加重试任务
return self._add_retry_tasks(task)
elif metrics.get('avg_duration', 0) > 3600:
# 执行时间长,优化任务
return self._optimize_tasks(task)
else:
# 正常情况,返回原计划
return self.get_execution_plan(task_id)
def _add_retry_tasks(self, task: Task) -> List[Task]:
"""添加重试任务"""
retry_task = Task(
id=f"{task.id}_retry",
name=f"Retry {task.name}",
description=f"Retry failed task: {task.description}",
priority=TaskPriority.HIGH,
dependencies=[task.id],
metadata={'type': 'retry', 'original_task_id': task.id}
)
self.add_task(retry_task)
# 更新任务图
self.task_graph[task.id].append(retry_task.id)
return self.get_execution_plan(task.id)
def _optimize_tasks(self, task: Task) -> List[Task]:
"""优化任务"""
# 分解长时间运行的任务
if task.subtasks:
# 已经有子任务,优化子任务
for subtask in task.subtasks:
if subtask.estimated_duration and subtask.estimated_duration > 1800:
self.decompose_task(subtask.id, max_depth=1)
else:
# 分解当前任务
self.decompose_task(task.id, max_depth=2)
return self.get_execution_plan(task.id)
def predict_completion_time(self, task_id: str) -> Optional[float]:
"""预测完成时间"""
plan = self.get_execution_plan(task_id)
total_time = 0.0
for task in plan:
if task.estimated_duration:
total_time += task.estimated_duration
elif task.id in self.performance_metrics:
# 使用历史平均时间
avg_duration = self.performance_metrics[task.id].get('avg_duration', 0)
total_time += avg_duration
return total_time if total_time > 0 else None
# 使用示例
planner = DynamicTaskPlanner()
# 添加任务
task = Task(
id="task1",
name="Process Large Dataset",
description="Process a large dataset",
priority=TaskPriority.HIGH,
estimated_duration=7200,
metadata={'type': 'data_processing'}
)
planner.add_task(task)
# 分解任务
planner.decompose_task("task1", max_depth=2)
# 模拟执行
planner.monitor_execution("task1", TaskStatus.IN_PROGRESS)
# 模拟失败
planner.monitor_execution("task1", TaskStatus.FAILED, error="Timeout")
# 分析性能
metrics = planner.analyze_performance("task1")
print(f"Performance metrics: {metrics}")
# 调整计划
adjusted_plan = planner.adjust_plan("task1")
print("\nAdjusted plan:")
for task in adjusted_plan:
print(f" - {task.name}")
```> **动态任务调整特点**:
> - 实时监控任务执行
> - 分析任务性能
> - 根据性能调整计划
> - 预测完成时间
```
## 规划算法优化
### 1. 并行任务调度
# 示例:并行任务调度
用户请求:
"实现并行任务调度算法"
Claude Code 生成的代码:
````python
`python
from typing import Dict, List, Set, Optional
from collections import defaultdict, deque
import logging
logger = logging.getLogger(__name__)
class ParallelTaskScheduler:
"""并行任务调度器"""
def __init__(self, max_workers: int = 4):
self.max_workers = max_workers
self.tasks: Dict[str, Task] = {}
self.dependencies: Dict[str, Set[str]] = defaultdict(set)
self.dependents: Dict[str, Set[str]] = defaultdict(set)
self.ready_tasks: Set[str] = set()
self.running_tasks: Set[str] = set()
self.completed_tasks: Set[str] = set()
def add_task(self, task: Task):
"""添加任务"""
self.tasks[task.id] = task
# 添加依赖关系
for dep_id in task.dependencies:
self.dependencies[task.id].add(dep_id)
self.dependents[dep_id].add(task.id)
# 如果没有依赖,标记为就绪
if not task.dependencies:
self.ready_tasks.add(task.id)
logger.info(f"Task added: {task.id}")
def get_ready_tasks(self) -> List[Task]:
"""获取就绪任务"""
available = self.ready_tasks - self.running_tasks
return [self.tasks[task_id] for task_id in available]
def start_task(self, task_id: str):
"""开始任务"""
if task_id in self.ready_tasks and task_id not in self.running_tasks:
self.running_tasks.add(task_id)
self.ready_tasks.remove(task_id)
logger.info(f"Task started: {task_id}")
def complete_task(self, task_id: str, result: Optional[Any] = None, error: Optional[str] = None):
"""完成任务"""
if task_id not in self.running_tasks:
logger.warning(f"Task not running: {task_id}")
return
# 更新任务状态
task = self.tasks[task_id]
task.status = TaskStatus.COMPLETED if not error else TaskStatus.FAILED
task.result = result
task.error = error
# 更新集合
self.running_tasks.remove(task_id)
self.completed_tasks.add(task_id)
# 检查依赖此任务的任务
for dependent_id in self.dependents[task_id]:
self.dependencies[dependent_id].remove(task_id)
# 如果所有依赖都完成,标记为就绪
if not self.dependencies[dependent_id]:
self.ready_tasks.add(dependent_id)
logger.info(f"Task completed: {task_id}")
def get_schedule(self) -> List[List[str]]:
"""获取调度计划"""
schedule = []
remaining = set(self.tasks.keys())
while remaining:
# 获取当前可以执行的任务
current_batch = []
for task_id in remaining:
task = self.tasks[task_id]
if all(dep in self.completed_tasks for dep in task.dependencies):
current_batch.append(task_id)
# 限制并发数
current_batch = current_batch[:self.max_workers]
if not current_batch:
logger.warning("Circular dependency detected")
break
schedule.append(current_batch)
# 更新剩余任务
remaining -= set(current_batch)
self.completed_tasks.update(current_batch)
return schedule
def visualize_schedule(self) -> str:
"""可视化调度"""
schedule = self.get_schedule()
visualization = "Task Schedule:\n"
for i, batch in enumerate(schedule, 1):
visualization += f"Batch {i} (parallel):\n"
for task_id in batch:
task = self.tasks[task_id]
visualization += f" - {task.name}\n"
return visualization
# 使用示例
scheduler = ParallelTaskScheduler(max_workers=3)
# 添加任务
tasks = [
Task(id="A", name="Task A", description="Task A", priority=TaskPriority.HIGH),
Task(id="B", name="Task B", description="Task B", priority=TaskPriority.HIGH, dependencies=["A"]),
Task(id="C", name="Task C", description="Task C", priority=TaskPriority.HIGH, dependencies=["A"]),
Task(id="D", name="Task D", description="Task D", priority=TaskPriority.HIGH, dependencies=["B", "C"]),
Task(id="E", name="Task E", description="Task E", priority=TaskPriority.HIGH, dependencies=["B"]),
Task(id="F", name="Task F", description="Task F", priority=TaskPriority.HIGH, dependencies=["C"]),
Task(id="G", name="Task G", description="Task G", priority=TaskPriority.HIGH, dependencies=["D", "E", "F"])
]
for task in tasks:
scheduler.add_task(task)
# 获取调度计划
schedule = scheduler.get_schedule()
print("Schedule:")
for i, batch in enumerate(schedule, 1):
print(f"Batch {i}: {[scheduler.tasks[task_id].name for task_id in batch]}")
# 可视化调度
print("\n" + scheduler.visualize_schedule())
```> **并行任务调度特点**:
> - 识别可并行执行的任务
> - 限制并发任务数量
> - 处理任务依赖关系
> - 生成调度计划
```
## 总结
自主规划算法包括:
1. **规划算法的基本概念**: 什么是自主规划、规划算法分类
2. **任务分解算法**: 层次任务分解、动态任务调整
3. **规划算法优化**: 并行任务调度
通过自主规划算法,Claude Code可以自主分解任务、制定执行计划,并根据实际情况动态调整。
在下一节中,我们将探讨记忆系统设计。
```